1) 코드

데이터 확인 1: 결측치 전체 확인

코드
import pandas as pd
import numpy as np

df_building_original = pd.read_csv("./Data/건축물대장_통합.csv")

# 주요 컬럼별 결측치 개수 확인
print("대지위치 결측:", df_building_original["대지위치"].isna().sum())
print("지상층수 결측:", df_building_original["지상층수"].isna().sum())
print("높이(m) 결측:", df_building_original["높이(m)"].isna().sum())
print("구조코드명 결측:", df_building_original["구조코드명"].isna().sum())
print("기타구조 결측:", df_building_original["기타구조"].isna().sum())
print("사용승인년도 결측:", df_building_original["사용승인년도"].isna().sum())
print("위도 결측:", df_building_original["위도"].isna().sum())
print("경도 결측:", df_building_original["경도"].isna().sum())
print("위도/경도 모두 결측:", (df_building_original["위도"].isna() & df_building_original["경도"].isna()).sum())
대지위치 결측: 0
지상층수 결측: 0
높이(m) 결측: 0
구조코드명 결측: 66
기타구조 결측: 511
사용승인년도 결측: 15595
위도 결측: 9669
경도 결측: 9669
위도/경도 모두 결측: 9669

데이터 확인 2-1: 위경도 결측치 중 구조코드명별 퍼센트

코드
# 위경도 결측치 중 구조코드명별 퍼센트
df_building_filter = df_building_original.loc[df_building_original["위도"].isna() & df_building_original["경도"].isna(), :]

df_building_grouped = (
    df_building_filter.groupby('구조코드명', dropna=False)
    .size()
    .rename('수')
    .reset_index()
)
df_building_grouped['퍼센트'] = (df_building_grouped['수'] / len(df_building_filter) * 100).round(2)
df_building_grouped = df_building_grouped.sort_values(['수','구조코드명'], ascending=[False, True], ignore_index=True)
df_building_grouped
구조코드명 퍼센트
0 일반목구조 4318 44.66
1 블록구조 2348 24.28
2 벽돌구조 1464 15.14
3 철근콘크리트구조 699 7.23
4 기타조적구조 286 2.96
5 일반철골구조 192 1.99
6 경량철골구조 160 1.65
7 석구조 64 0.66
8 강파이프구조 44 0.46
9 목구조 26 0.27
10 기타강구조 14 0.14
11 철골철근콘크리트구조 11 0.11
12 기타구조 10 0.10
13 프리케스트콘크리트구조 10 0.10
14 NaN 9 0.09
15 통나무구조 5 0.05
16 철골콘크리트구조 4 0.04
17 조적구조 3 0.03
18 기타콘크리트구조 1 0.01
19 시멘트블럭조 1 0.01

데이터 확인 2-2: 위경도 결측치 중 일반목구조 비율

코드
# 위경도 결측치 중 일반목구조가 전체 일반목구조에 어느정도 해당하는지
original_wooden_structure = ((df_building_original["구조코드명"] == "일반목구조") & (df_building_original["위도"].notna())).sum()
filter_wooden_structure = df_building_grouped.loc[df_building_grouped["구조코드명"] == "일반목구조", "수"].values[0]
(filter_wooden_structure / original_wooden_structure * 100).round(2)
17.63

위도 경도 컬럼 추가

코드
import os
import time
import json
import requests
import pandas as pd

# 기본 설정
KAKAO_REST_KEY = "f939970b0ab002e6aa011535f5388344"
KAKAO_URL = "https://dapi.kakao.com/v2/local/search/address.json"
HEADERS = {"Authorization": f"KakaoAK {KAKAO_REST_KEY}"}
REQUEST_INTERVAL = 0.15
CACHE_PATH = "./kakao_geocode_cache.json"


# 캐시 로드/세이브
def load_cache(path=CACHE_PATH):
    if os.path.exists(path):
        try:
            with open(path, "r", encoding="utf-8") as f:
                return json.load(f)
        except:
            return {}
    return {}

def save_cache(cache, path=CACHE_PATH):
    try:
        with open(path, "w", encoding="utf-8") as f:
            json.dump(cache, f, ensure_ascii=False)
    except:
        pass

cache = load_cache()

# 카카오 지오코딩 함수
def kakao_geocode(address):
    if not isinstance(address, str) or not address.strip():
        return None

    addr = address.strip()
    if addr in cache:
        return cache[addr]

    params = {"query": addr}
    try:
        r = requests.get(KAKAO_URL, headers=HEADERS, params=params, timeout=10)
        if r.status_code == 200:
            data = r.json()
            docs = data.get("documents", [])
            if docs:
                d0 = docs[0]
                lon, lat = float(d0["x"]), float(d0["y"])
                result = {"lat": lat, "lon": lon}
            else:
                result = None
        else:
            result = None
    except:
        result = None

    cache[addr] = result
    save_cache(cache)
    time.sleep(REQUEST_INTERVAL)
    return result

# CSV 불러오기
df_building_original = pd.read_csv(
    "../Raw Data/건축물대장/건축물대장_대구광역시_종합.csv",
    sep=None, engine="python"
)

df_building_filter = df_building_original[[
    "대지위치", "지상층수", "지하층수", "높이(m)", "구조코드명", "기타구조", "주용도코드명", "비상용승강기수"
]].copy()

df_building_filter["사용승인년도"] = df_building_original["사용승인일"].astype(str).str.slice(0, 4)

# 1만개씩 나누기
df_building_temp = df_building_filter.loc[:10000, :].copy()

# 좌표 변환 적용
df_building_temp["위도"] = None
df_building_temp["경도"] = None

for i, addr in enumerate(df_building_temp["대지위치"].dropna().unique(), 1):
    geo = kakao_geocode(addr)
    if geo:
        df_building_temp.loc[df_building_temp["대지위치"] == addr, "위도"] = geo["lat"]
        df_building_temp.loc[df_building_temp["대지위치"] == addr, "경도"] = geo["lon"]
    if i % 500 == 0:
        print(f"{i} / {len(df_building_temp['대지위치'].dropna().unique)} 처리 완료")

# 저장
df_building_temp.to_csv(
    "../Raw Data/건축물대장_위도경도포함/1.csv",
    index=False, encoding="utf-8-sig"
)
print("저장 완료")

# 파일 합치기
files = [
    "../Raw Data/건축물대장_위도경도포함/건축물2_좌표.csv",
    "../Raw Data/건축물대장_위도경도포함/건축물대장1_1.csv",
    "../Raw Data/건축물대장_위도경도포함/건축물대장1_2.csv",
    "../Raw Data/건축물대장_위도경도포함/건축물대장1_3.csv",
    "../Raw Data/건축물대장_위도경도포함/건축물대장1_4.csv",
    "../Raw Data/건축물대장_위도경도포함/건축물대장1_5.csv",
    "../Raw Data/건축물대장_위도경도포함/건축물대장1_6.csv",
    "../Raw Data/건축물대장_위도경도포함/대구_건축물대장_2(6~80000).csv",
    "../Raw Data/건축물대장_위도경도포함/대구_건축물대장_all.csv",
    "../Raw Data/건축물대장_위도경도포함/건축물대장(30000~49999).csv"
]

# 뽑을 컬럼 목록
columns_to_keep = [
    "대지위치", "지상층수", "지하층수", "높이(m)", "구조코드명", "기타구조",
    "주용도코드명", "비상용승강기수", "사용승인년도", "위도", "경도"
]

dfs = []

for file in files:
    df = pd.read_csv(file, encoding="utf-8")
    
    existing_cols = [col for col in columns_to_keep if col in df.columns]
    df = df[existing_cols]
    
    dfs.append(df)

merged_df = pd.concat(dfs, ignore_index=True)

# 저장
merged_df.to_csv("./Data/건축물대장_통합.csv", index=False, encoding="utf-8-sig")

소화전/소방서 최소거리 컬럼 추가

코드
import pandas as pd
import numpy as np

# 데이터 불러오기
df_building = pd.read_csv('./Data/건축물대장_통합.csv')
df_hydrant = pd.read_csv('./Data/대구_소방장치_위치.csv')
df_firestation = pd.read_csv('./Data/대구_소방서_위치.csv')

# 거리계산 함수 정의
def haversine_min_distance(lat1, lon1, hy_lats, hy_lons):
    R = 6371000  # 지구 반지름 (m)
    lat1 = np.radians(lat1)
    lon1 = np.radians(lon1)
    dlat = hy_lats - lat1
    dlon = hy_lons - lon1
    a = np.sin(dlat / 2)**2 + np.cos(lat1) * np.cos(hy_lats) * np.sin(dlon / 2)**2
    c = 2 * np.arcsin(np.sqrt(a))
    distances = R * c
    return distances.min()

# 위경도 라디안화
hydrant_lats = np.radians(df_hydrant["위도"].values)
hydrant_lons = np.radians(df_hydrant["경도"].values)
station_lats = np.radians(df_firestation["위도"].values)
station_lons = np.radians(df_firestation["경도"].values)

# 소화전거리 계산 및 추가
df_building['소화전거리'] = df_building.apply(
    lambda row: haversine_min_distance(row["위도"], row["경도"], hydrant_lats, hydrant_lons),
    axis=1
)
# 소방서거리 계산 및 추가
df_building["소방서거리"] = df_building.apply(
    lambda row: haversine_min_distance(row["위도"], row["경도"], station_lats, station_lons),
    axis=1
)

# 결과 저장
df_building.to_csv('./Data/건축물대장_소화전_소방서거리.csv', index=False)

건축물 종합 점수 산정

코드
from datetime import date
import math, re
import numpy as np
import pandas as pd

def _parse_year(value):
    if value is None:
        return None
    if isinstance(value, float) and math.isnan(value):
        return None
    if isinstance(value, int):
        return value
    if isinstance(value, float):
        return int(value)
    s = str(value).strip()
    if s == "":
        return None
    s_num = s.replace(",", "")
    if re.fullmatch(r"[+-]?\d+(\.\d+)?", s_num):
        try:
            return int(float(s_num))
        except Exception:
            return None
    m = re.search(r"(\d{4})", s)
    if m:
        return int(m.group(1))
    return None

def aging_score(value):
    year = _parse_year(value)
    if year is None:
        return 0
    current_year = date.today().year
    age = current_year - year
    if age < 0 or year < 1800:
        return 0
    if age >= 40:
        return 5
    elif age >= 30:
        return 4
    elif age >= 20:
        return 3
    elif age >= 10:
        return 2
    elif age >= 0:
        return 1
    else:
        return 0

def _parse_floor_count(value):
    if value is None:
        return None
    if isinstance(value, float) and math.isnan(value):
        return None
    if isinstance(value, (int, float)):
        n = int(float(value))
        return n if n >= 0 else None
    s = str(value).strip()
    if s == "":
        return None
    s_num = s.replace(",", "")
    if re.fullmatch(r"[+-]?\d+(\.\d+)?", s_num):
        try:
            n = int(float(s_num))
            return n if n >= 0 else None
        except Exception:
            return None
    m = re.search(r"(-?\d+)", s)
    if m:
        n = int(m.group(1))
        return n if n >= 0 else None
    return None

def aboveground_floors_score(value):
    floors = _parse_floor_count(value)
    if floors is None:
        return 0
    if floors >= 30:
        return 5
    elif floors >= 20:
        return 4
    elif floors >= 10:
        return 3
    elif floors >= 5:
        return 2
    elif floors >= 1:
        return 1
    else:
        return 0

def _parse_basement_floor_count(value):
    if value is None:
        return None
    if isinstance(value, float) and math.isnan(value):
        return None
    if isinstance(value, (int, float)):
        try:
            n = int(float(value))
            return abs(n) if n != 0 else 0
        except Exception:
            return None
    s = str(value).strip()
    if s == "":
        return None
    m = re.fullmatch(r"[Bb]\s*(\d+)\s*[Ff]?", s)
    if m:
        return int(m.group(1))
    m = re.search(r"지하\s*(\d+)", s)
    if m:
        return int(m.group(1))
    m = re.search(r"(-?\d+)", s.replace(",", ""))
    if m:
        try:
            return abs(int(m.group(1)))
        except Exception:
            return None
    return None

def basement_floors_score(value):
    floors = _parse_basement_floor_count(value)
    if floors is None:
        return 0
    if floors >= 3:
        return 3
    elif floors >= 2:
        return 2
    elif floors >= 1:
        return 1
    else:
        return 0

MAIN_USE_SCORE_MAP = {
    '숙박시설': 9.0,
    '야영장시설': 9.0,
    '관광휴게시설': 9.0,
    '공장': 8.0,
    '창고시설': 8.0,
    '노유자시설': 7.0,
    '교육연구시설': 7.0,
    '교육연구및복지시설': 7.0,
    '의료시설': 7.0,
    '수련시설': 7.0,
    '제2종근린생활시설': 5.0,
    '근린생활시설': 5.0,
    '제1종근린생활시설': 5.0,
    '종교시설': 5.0,
    '문화및집회시설': 5.0,
    '운동시설': 5.0,
    '업무시설': 5.0,
    '판매시설': 5.0,
    '위락시설': 5.0,
    '판매및영업시설': 5.0,
    '기타제1종근린생활시설': 5.0,
    '생활편익시설': 5.0,
    '소매점': 5.0,
    '동물및식물관련시설': 4.0,
    '위험물저장및처리시설': 4.0,
    '자원순환관련시설': 4.0,
    '분뇨.쓰레기처리시설': 4.0,
    '방송통신시설': 4.0,
    '자동차관련시설': 4.0,
    '장례시설': 4.0,
    '운수시설': 4.0,
    '교정및군사시설': 4.0,
    '국방,군사시설': 4.0,
    '발전시설': 4.0,
    '묘지관련시설': 4.0,
    '단독주택': 2.0,
    '공동주택': 2.0,
    '다가구주택': 2.0,
    '공공용시설': 1.0,
}

def main_use_score_exact(value) -> float:
    if value is None:
        return 0.0
    if isinstance(value, float) and math.isnan(value):
        return 0.0
    s = str(value).strip()
    if s == "" or s.lower() == "nan":
        return 0.0
    return MAIN_USE_SCORE_MAP.get(s, 0.0)

STRUCTURE_SCORE_MAP = {
    '철근콘크리트구조': 0.0,
    '콘크리트구조': 0.0,
    '프리케스트콘크리트구조': 0.0,
    '보강콘크리트조': 0.0,
    '기타콘크리트구조': 0.0,
    '라멘조': 0.0,
    '일반철골구조': 2.0,
    '경량철골구조': 2.0,
    '강파이프구조': 2.0,
    '철파이프조': 2.0,
    '기타강구조': 2.0,
    '스틸하우스조': 2.0,
    '단일형강구조': 2.0,
    '철골구조': 2.0,
    '공업화박판강구조(PEB)': 2.0,
    '트러스구조': 2.0,
    '철골콘크리트구조': 2.0,
    '철골철근콘크리트구조': 2.0,
    '철골철근콘크리트합성구조': 2.0,
    '기타철골철근콘크리트구조': 2.0,
    '일반목구조': 5.0,
    '목구조': 5.0,
    '통나무구조': 5.0,
    '트러스목구조': 5.0,
    '벽돌구조': 4.0,
    '블록구조': 4.0,
    '시멘트블럭조': 4.0,
    '조적구조': 4.0,
    '기타조적구조': 4.0,
    '석구조': 4.0,
    '흙벽돌조': 4.0,
    '조립식판넬조': 3.0,
    '컨테이너조': 3.0,
    '막구조': 1.0,
    '기타구조': 1.0,
}

def structure_score(value) -> float:
    if value is None:
        return 0.0
    if isinstance(value, float) and math.isnan(value):
        return 0.0
    s = str(value).strip()
    if not s or s.lower() == "nan":
        return 0.0
    if s in STRUCTURE_SCORE_MAP:
        return STRUCTURE_SCORE_MAP[s]
    if ('목' in s) or ('통나무' in s):
        return 5.0
    if ('조적' in s) or ('벽돌' in s) or ('블록' in s) or ('석' in s):
        return 4.0
    if ('조립' in s) or ('판넬' in s) or ('컨테이너' in s):
        return 3.0
    if ('철골' in s) or ('강구조' in s) or ('스틸' in s) or ('파이프' in s):
        return 2.0
    if ('막' in s) or ('특수' in s):
        return 1.0
    if ('콘크리트' in s) or ('라멘' in s):
        return 0.0
    return 0.0

def _parse_nonneg_int_count(value):
    if value is None:
        return None
    if isinstance(value, float) and math.isnan(value):
        return None
    if isinstance(value, (int, float)):
        n = int(float(value))
        return n if n >= 0 else None
    s = str(value).strip()
    if s == "" or s.lower() == "nan":
        return None
    m = re.search(r"(\d+)", s.replace(",", ""))
    if m:
        n = int(m.group(1))
        return n if n >= 0 else None
    return None

def emergency_elevator_score(value) -> float:
    n = _parse_nonneg_int_count(value)
    if n is None:
        return 0.0
    if n == 0:
        return 5.0
    elif n == 1:
        return 4.0
    elif n == 2:
        return 3.0
    elif n == 3:
        return 2.0
    elif n == 4:
        return 1.0
    else:
        return 0.0
    
def firestation_distance_score(dist_m, cap_over_max=True, invalid_to_nan=True):
    arr = np.asarray(dist_m, dtype=float)
    if invalid_to_nan:
        arr = np.where(arr < 0, np.nan, arr)
    default_val = 5.0 if cap_over_max else np.nan
    scores = np.select(
        [arr < 1000, arr < 3000, arr < 5000, arr < 7000, arr < 9000],
        [1.0,        2.0,        3.0,        4.0,        5.0],
        default=default_val
    )
    if np.isscalar(dist_m):
        return float(scores.item())
    if isinstance(dist_m, pd.Series):
        return pd.Series(scores, index=dist_m.index, name=getattr(dist_m, "name", None))
    return scores

def hydrant_distance_score(dist_m, cap_over_max=True, invalid_to_nan=True):
    arr = np.asarray(dist_m, dtype=float)
    if invalid_to_nan:
        arr = np.where(arr < 0, np.nan, arr)
    default_val = 5.0 if cap_over_max else np.nan
    scores = np.select(
        [arr <= 30, arr <= 60, arr <= 90, arr <= 120, arr <= 150],
        [1.0,       2.0,       3.0,       4.0,        5.0],
        default=default_val
    )
    scores = np.where(np.isnan(arr), np.nan, scores)
    if np.isscalar(dist_m):
        return float(np.asarray(scores).item())
    if isinstance(dist_m, pd.Series):
        return pd.Series(scores, index=dist_m.index, name=getattr(dist_m, "name", None))
    return scores

# 점수 산정
df = pd.read_csv("./Data/건축물대장_소화전_소방서거리.csv")
df["건물노후도점수"] = df["사용승인년도"].apply(aging_score)
df["지상층수점수"] = df["지상층수"].apply(aboveground_floors_score)
df["지하층수점수"] = df["지하층수"].apply(basement_floors_score)
df["주용도점수"] = df["주용도코드명"].apply(main_use_score_exact)
df["구조점수"] = df["구조코드명"].apply(structure_score)
df["비상용승강기점수"] = df["비상용승강기수"].apply(emergency_elevator_score)
df["소방서거리점수"] = df["소방서거리"].apply(firestation_distance_score)
df["소화전거리점수"] = df["소화전거리"].apply(hydrant_distance_score)
df["종합점수"] = df["건물노후도점수"] + df["지상층수점수"] + df["지하층수점수"] + df["주용도점수"] + df["구조점수"] + df["비상용승강기점수"] + df["소방서거리점수"] + df["소화전거리점수"]

df.to_csv("./Data/건축물대장_통합_점수.csv")

2) 시각화

119안전센터 및 소화장치 위치 시각화

코드
# 대구광역시 119안전센터 및 소화장치 위치 시각화

# 데이터 출처
# 대구광역시_소방 긴급구조 비상 소화장치 현황
# https://www.data.go.kr/data/15117284/fileData.do

# 소방청_119안전센터 현황
# https://www.data.go.kr/data/15065056/fileData.do

import pandas as pd 
import numpy as np
import plotly.express as px
import plotly.graph_objects as go

loc_119 = pd.read_csv("./Data/대구_소방서_위치.csv")
loc_fire = pd.read_csv("./Data/대구_소방장치_위치.csv")




# 대구광역시 구별 소방 안전센터 시각화 
import json
with open ("./Data/시각화/대구_시군구_군위포함/대구_시군구_군위포함.geojson", encoding='utf-8') as f:
    geojson_data = json.load(f)
# print(geojson_data.keys())

# 구별 소방 안전센터 scatter_mapbox
fig = px.scatter_mapbox(
    loc_119, lat="위도", lon="경도", color="구이름",
    hover_name="119안전센터명",
    hover_data={"위도": False, "경도": False, "구이름": True, "동이름": True},
    zoom=11,
    height=650,
)
fig.update_traces(marker=dict(size=15))

# 구별 소방 긴급구조 비상 소화장치 scatter mapbox
fig.add_trace(go.Scattermapbox(
    lat=loc_fire["위도"],
    lon=loc_fire["경도"],
    mode="markers",
    marker=go.scattermapbox.Marker(size=5, color="blue"),
    name="소화장치",
    hovertemplate="<b>구:</b> %{customdata[0]}<br><b>동:</b> %{customdata[1]}<extra></extra>",
    customdata=loc_fire[["구이름", "동이름"]].values,
))

fig.update_layout(
    mapbox_style="carto-positron",
    mapbox_layers=[
        {
            "sourcetype": "geojson",
            "source": geojson_data,
            "type": "line",
            "color": "green",
            "line": {"width": 1},
        }
    ],
    mapbox_center={"lat": 35.8714, "lon": 128.6014},
    margin={"r":0, "t":30, "l":0, "b":0},
)
fig.show()

소방서, 소화전 시각화

코드
# %% 라이브러리 호출
import pandas as pd
import numpy as np
import plotly.express as px
# %% 데이터 로드
df = pd.read_csv('./Data/건축물대장_통합_점수.csv')
hyd = pd.read_csv('./Raw Data/대구광역시_소화장치_위치데이터.csv')
#firestn = pd.read_csv('대구광역시_소방서_위치데이터.csv', encoding='cp949')
# %%
hydrant_lats = np.radians(hyd["위도"].values)
hydrant_lons = np.radians(hyd["경도"].values)
# %% 거리계산 함수 정의
def haversine_min_distance(lat1, lon1, hy_lats, hy_lons):
    R = 6371000  # 지구 반지름 (m)
    lat1 = np.radians(lat1)
    lon1 = np.radians(lon1)
    
    dlat = hy_lats - lat1
    dlon = hy_lons - lon1
    a = np.sin(dlat / 2)**2 + np.cos(lat1) * np.cos(hy_lats) * np.sin(dlon / 2)**2
    c = 2 * np.arcsin(np.sqrt(a))
    distances = R * c
    return distances.min()
# %% min({소화전거리(m)})
df['소화전거리'] = df.apply(
    lambda row: haversine_min_distance(row["위도"], row["경도"], hydrant_lats, hydrant_lons),
    axis=1
)
# %%
df['소화전거리'].head()
# %% 소방서 데이터
firestation = pd.read_csv('./Data/대구_소방서_위치.csv')
firestation.head()
# %% min({소방서거리(m)})
station_lats = np.radians(firestation["위도"].values)
station_lons = np.radians(firestation["경도"].values)
df["소방서거리"] = df.apply(
    lambda row: haversine_min_distance(row["위도"], row["경도"], station_lats, station_lons),
    axis=1
)

# %% 소방서거리, 소화전거리 분포 시각화

# 소방서거리 분포
fig1 = px.histogram(df, x="소방서거리", nbins=100, title="가장 가까운 소방서 거리 분포", marginal="box")
fig1.update_layout(
    bargap=0.1,
    xaxis_title="거리(m)",
    yaxis_title="건물 수",
    template='plotly_white'
)

fig1.show()

# 소화전거리 분포
fig2 = px.histogram(df, x="소화전거리", nbins=100, title="가장 가까운 소화전 거리 분포", marginal="box")
fig2.update_layout(
    bargap=0.1,
    xaxis_title="거리(m)",
    yaxis_title="건물 수",
    template='plotly_white'
)

fig2.show()

노령 인구 시각화

코드
#======================================
# 노령 인구 비율 시각화
#======================================

# 동별 노령인구 비율 시각화
import pandas as pd 
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
df = pd.read_csv("./Data/동별인구.csv")
new = df[['군·구', '동·읍·면', '고령자_비율','위도','경도']]

# 동별 고령자 비율 값
g2_by_dong = new.groupby(['동·읍·면'])[['고령자_비율']].sum()
g2_by_dong = g2_by_dong.sort_values(by='고령자_비율',ascending=False)
g2_by_dong.rename(columns={'고령자_비율': '고령자_평균비율'}, inplace=True)
g2_by_dong = g2_by_dong.reset_index()
# g2_by_dong.info()

import geopandas as gpd
gdf = gpd.read_file("./Data/시각화/대구_행정동/대구_행정동_군위포함.shp")
print(gdf.crs)
gdf = gdf.to_crs(epsg=4326)
# gdf.to_file("./Data/대구_행정동_군위포함.geojson", driver="GeoJSON")

import json
with open("./Data/시각화/대구_행정동/대구_행정동_군위포함.geojson", encoding='utf-8') as f:
 geojson_data = json.load(f)
# print(geojson_data.keys())
# print(geojson_data['features'][0]['properties'])

# gdf 파일에 유천동이 없고 g2_by_dong 파일에 유천동이 있어 행 삭제
cond = (gdf['ADM_DR_CD'] == '유천동')
gdf[cond]
g2_by_dong.rename(columns={'동·읍·면': 'ADM_DR_NM'}, inplace=True)
cond = (g2_by_dong['ADM_DR_NM'] == '유천동')
g2_by_dong = g2_by_dong.drop(g2_by_dong[cond].index)

# 불로봉무동 이름 변경
g2_by_dong.loc[g2_by_dong['ADM_DR_NM'] == '불로봉무동', 'ADM_DR_NM'] = '불로·봉무동'


# 동별 노령인구 비율 시각화
fig = px.choropleth_mapbox(g2_by_dong,
 geojson=geojson_data,
 locations="ADM_DR_NM",
 featureidkey="properties.ADM_DR_NM",
 color="고령자_평균비율",
 color_continuous_scale="Greens",
 mapbox_style="carto-positron",
 center={"lat":35.87702415809577, "lon":128.58970500739858},
 zoom=10,                
opacity=0.7,               
title="대구광역시 동별 노인평균인구비율"  
)
fig.update_layout(margin={"r":0,"t":30,"l":0,"b":0}) 
fig.show() 


# ===================================
# 구별 고령자 비율 평균
g1_by_gu = new.groupby(['군·구'])[['고령자_비율']].mean()
g1_by_gu = g1_by_gu.reset_index()
g1_by_gu = g1_by_gu.sort_values(by='고령자_비율',ascending=False)
g1_by_gu.rename(columns={'군·구': 'SIGUNGU_NM', '고령자_비율': '고령자_평균비율',}, inplace=True)


import geopandas as gpd
gdf2 = gpd.read_file("./Data/시각화/대구_시군구_군위포함/대구광역시_시군구_군위포함.shp")
print(gdf2.crs)
gdf2 = gdf2.to_crs(epsg=4326)
# gdf2.to_file("./Data/대구_시군구_군위포함.geojson", driver="GeoJSON")

import json
with open("./Data/시각화/대구_시군구_군위포함/대구_시군구_군위포함.geojson", encoding='utf-8') as f:
 geojson_data2 = json.load(f)
print(geojson_data2.keys())

print(geojson_data2['features'][0]['properties'])

# 구별 노령 인구 비율 시각화
fig = px.choropleth_mapbox(g1_by_gu,
 geojson=geojson_data2,
 locations="SIGUNGU_NM",
 featureidkey="properties.SIGUNGU_NM",
 color="고령자_평균비율",
 color_continuous_scale="Greens",
 mapbox_style="carto-positron",
 center={"lat":35.87702415809577, "lon":128.58970500739858},
 zoom=10,                
opacity=0.7,               
title="대구광역시 구별 노인평균인구비율"  
)
fig.update_layout(margin={"r":0,"t":30,"l":0,"b":0}) 
fig.show()
PROJCS["Korea_2000_Korea_Unified_Coordinate_System",GEOGCS["GCS_Korea_2000",DATUM["Korean_Geodetic_Datum_2002",SPHEROID["GRS 1980",6378137,298.257222101,AUTHORITY["EPSG","7019"]],AUTHORITY["EPSG","6737"]],PRIMEM["Greenwich",0],UNIT["Degree",0.0174532925199433]],PROJECTION["Transverse_Mercator"],PARAMETER["latitude_of_origin",38],PARAMETER["central_meridian",127.5],PARAMETER["scale_factor",0.9996],PARAMETER["false_easting",1000000],PARAMETER["false_northing",2000000],UNIT["metre",1,AUTHORITY["EPSG","9001"]],AXIS["Easting",EAST],AXIS["Northing",NORTH]]
PROJCS["Korea_2000_Korea_Unified_Coordinate_System",GEOGCS["GCS_Korea_2000",DATUM["Korean_Geodetic_Datum_2002",SPHEROID["GRS 1980",6378137,298.257222101,AUTHORITY["EPSG","7019"]],AUTHORITY["EPSG","6737"]],PRIMEM["Greenwich",0],UNIT["Degree",0.0174532925199433]],PROJECTION["Transverse_Mercator"],PARAMETER["latitude_of_origin",38],PARAMETER["central_meridian",127.5],PARAMETER["scale_factor",0.9996],PARAMETER["false_easting",1000000],PARAMETER["false_northing",2000000],UNIT["metre",1,AUTHORITY["EPSG","9001"]],AXIS["Easting",EAST],AXIS["Northing",NORTH]]
dict_keys(['type', 'name', 'crs', 'features'])
{'BASE_DATE': '20210630', 'SIGUNGU_CD': '22010', 'SIGUNGU_NM': '중구', 'sgg_code': 22010.0, 'sggcd': 22010}

건축물대장 시각화

코드
# %% 라이브러리 호출
import numpy as np
import pandas as pd
import plotly.graph_objects as go
import plotly.express as px
# %% check
# columns_to_check = ['Column14', 'Column15', 'Column60', 'Column61', 'Column67']
# %% 구/군 별 데이터 로드
df1 = pd.read_csv('./Raw Data/건축물대장/건축물대장_대구광역시_군위군.csv')
df2 = pd.read_csv('./Raw Data/건축물대장/건축물대장_대구광역시_남구.csv')
df3 = pd.read_csv('./Raw Data/건축물대장/건축물대장_대구광역시_달서구.csv')
df4 = pd.read_csv('./Raw Data/건축물대장/건축물대장_대구광역시_달성군.csv')
df5 = pd.read_csv('./Raw Data/건축물대장/건축물대장_대구광역시_동구.csv')
df6 = pd.read_csv('./Raw Data/건축물대장/건축물대장_대구광역시_북구.csv')
df7 = pd.read_csv('./Raw Data/건축물대장/건축물대장_대구광역시_서구.csv')
df8 = pd.read_csv('./Raw Data/건축물대장/건축물대장_대구광역시_수성구.csv')
df9 = pd.read_csv('./Raw Data/건축물대장/건축물대장_대구광역시_중구.csv')

# %% 구/군 컬럼 추가
df1['군/구'] = '군위군'
df2['군/구'] = '남구'
df3['군/구'] = '달서구'
df4['군/구'] = '달성군'
df5['군/구'] = '동구'
df6['군/구'] = '북구'
df7['군/구'] = '서구'
df8['군/구'] = '수성구'
df9['군/구'] = '중구'
# %% 구/군 별 데이터 통합
df_all = pd.concat([df1, df2, df3, df4, df5, df6, df7, df8, df9], ignore_index=True)
# %% 구조 분류 딕셔너리 정의
structure_map = {
    '목조 계열': ['일반목구조', '목구조', '트러스목구조', '통나무구조'],
    '조적식 구조': ['석구조', '벽돌구조', '블록구조', '시멘트블럭조', '흙벽돌조', '조적구조', '기타조적구조'],
    '콘크리트 계열': ['철근콘크리트구조','콘크리트구조','프리케스트콘크리트구조','보강콘크리트조','기타콘크리트구조','라멘조'],
    '철골 계열': ['일반철골구조','경량철골구조','강파이프구조','철파이프조','기타강구조','스틸하우스조','단일형강구조','철골구조','공업화박판강구조(PEB)','트러스구조',
    '철골콘크리트구조','철골철근콘크리트구조','철골철근콘크리트합성구조','기타철골철근콘크리트구조'],
    '조립식·판넬·기타': ['조립식판넬조', '컨테이너조'],
    '기타 / 특수 구조': ['막구조', '기타구조']
}
# %% 구조 분류
def map_structure_type(name):
    for group, items in structure_map.items():
        if name in items:
            return group
    return '미분류'
df_all['구조그룹'] = df_all['구조코드명'].apply(map_structure_type)
# %% 건축 자재별 분포 시각화
structure_counts = df_all['구조그룹'].value_counts()

# 도넛차트 그리기
fig = go.Figure(data=[go.Pie(
    labels=structure_counts.index,
    values=structure_counts.values,
    hole=0.4,
    textinfo='percent+label',
    hoverinfo='label+value+percent',
    insidetextorientation='radial'
)])

fig.update_layout(
    title_text='건축 자재별 건물 분포',
    annotations=[dict(text='', x=0.5, y=0.5, font_size=18, showarrow=False)],
    showlegend=True
)

fig.show()
# %% 주용도 분류 딕셔너리 정의
building_use = {
    '숙박/다중이용시설': ['숙박시설', '야영장시설', '관광휴게시설'],
    '공장/창고시설': ['공장','창고시설'],
    '교육/복지/의료/수련': ['노유자시설', '교육연구시설', '교육연구및복지시설', '의료시설', '수련시설'],
    '상업/판매/문화/업무/근린/생활편익':
    ['제2종근린생활시설',
    '근린생활시설',
    '제1종근린생활시설',
    '종교시설',
    '문화및집회시설',
    '운동시설',
    '업무시설',
    '판매시설',
    '위락시설',
    '판매및영업시설',
    '기타제1종근린생활시설',
    '생활편익시설',
    '소매점'],
    '기반시설':
    ['동물및식물관련시설',
    '위험물저장및처리시설',
    '자원순환관련시설',
    '분뇨.쓰레기처리시설',
    '방송통신시설',
    '자동차관련시설',
    '장례시설',
    '운수시설',
    '교정및군사시설',
    '국방,군사시설',
    '발전시설',
    '묘지관련시설'],
    '주거':
    ['단독주택',
    '공동주택',
    '다가구주택'],
    '행정/공공':
    '공공용시설',
}
# %% 주용도 분류
def use_type(name):
    if not isinstance(name, str):
        return '미분류'

    for group, items in building_use.items():
        if name in items:
            return group
    return '미분류'
df_all['주용도그룹'] = df_all['주용도코드명'].apply(use_type)
# %% 용도별 분포 시각화
use_group_counts = df_all['주용도그룹'].value_counts()
use_group_ratio = use_group_counts / use_group_counts.sum()

# 2% 미만은 기타로 묶기
threshold = 0.02
labels = []
values = []
etc_total = 0

for label, ratio in use_group_ratio.items():
    if ratio >= threshold:
        labels.append(label)
        values.append(use_group_counts[label])
    else:
        etc_total += use_group_counts[label]

# 기타 항목 추가
if etc_total > 0:
    labels.append('기타')
    values.append(etc_total)

# 도넛 차트 생성
fig1 = go.Figure(data=[go.Pie(
    labels=labels,
    values=values,
    hole=0.3,  # 도넛 중앙 구멍 작게 = 도넛 자체 크게
    textinfo='percent+label',
    hoverinfo='label+value+percent',
    insidetextorientation='radial'
)])

# 레이아웃 조정
fig1.update_layout(
    title_text='주용도그룹 분포 (2% 미만 기타로 통합)',
    annotations=[dict(text='주용도', x=0.5, y=0.5, font_size=20, showarrow=False)],
    showlegend=True,
    height=600,  # 높이 늘려서 크게 보기
    width=700
)

fig1.show()
# %% 용도, 자재 교차 분석 시각화
cross_tab = pd.crosstab(df_all['주용도그룹'], df_all['구조그룹'])
fig2 = go.Figure()
for 구조 in cross_tab.columns:
    fig2.add_trace(go.Bar(
        x=cross_tab.index,
        y=cross_tab[구조],
        name=구조
    ))

# 레이아웃 설정
fig2.update_layout(
    barmode='stack',  # 스택형 막대
    title='주용도그룹 vs 구조그룹 분포 (스택형 막대 그래프)',
    xaxis_title='주용도그룹',
    yaxis_title='건물 수',
    legend_title='구조그룹',
    template='plotly_white'
)

fig2.show()
# %% 비상용 승강기 수 분포 시각화
cond_elevator = df_all['지상층수'] >= 5
emergency = df_all[cond_elevator]
# 결측치 0으로 대치
emergency['비상용승강기수'] = emergency['비상용승강기수'].fillna(0).astype(int)

# 5개 이상은 '5개 이상'으로 범주화
def categorize_elevators(x):
    return str(x) if x < 5 else '5개 이상'

emergency['비상용승강기_그룹'] = emergency['비상용승강기수'].apply(categorize_elevators)

# 그룹별 건물 수 집계
grouped = emergency['비상용승강기_그룹'].value_counts().sort_index().reset_index()
grouped.columns = ['비상용승강기수', '건물수']

# 파이차트 시각화 (파이 크기 크게 설정)
fig = px.pie(grouped,
             names='비상용승강기수',
             values='건물수',
             title='지상 5층 이상 건물의 비상용 승강기 수 분포',
             width=700, height=700,  # 파이 크기 조절
             color_discrete_sequence=px.colors.sequential.Magma)

# 퍼센트와 라벨 모두 표시
fig.update_traces(textinfo='percent+label',
                  textfont_size=16,
                  pull=[0.03]*len(grouped))  # 조각 약간 분리(선택)

fig.show()
# %% 사용승인일 이상값 탐색(보충 필요)
df_all['사용승인일_길이'] = df_all['사용승인일'].astype(str).str.len()
df_all['사용승인일_길이'].unique()
cond = df_all['사용승인일_길이'] == 9
df_all[cond]['사용승인일'].unique()
df_year = df_all.copy()
cond_y9 = df_year['사용승인일_길이'] == 9
df_year.loc[cond_y9, '사용승인일'] = '19' + df_year.loc[cond_y9, '사용승인일'].astype(str)
cond_y11 = df_year['사용승인일'] == '191979100.0'
df_year[cond_y11]
df_year.loc[cond_y11, '사용승인일'] = df_year.loc[cond_y11, '사용승인일'].str[2:]
df_year['사용승인일_길이'] = df_year['사용승인일'].astype(str).str.len()
cond_drop = df_year['사용승인일_길이'].isin([2, 3, 5])
df_year = df_year[~cond_drop]
df_year.loc[:, '사용승인일'] = df_year['사용승인일'].astype(str).str.strip()
# %% 사용승인일(년도) 추출
df_year.loc[:, '사용승인일(년도)'] = df_year['사용승인일'].astype(str).str[:4]
df_year['사용승인일(년도)'] = df_year['사용승인일(년도)'].astype(str).str.strip()
df_year['사용승인일(년도)'].replace('', pd.NA, inplace=True)
df_year['사용승인일(년도)'] = pd.to_numeric(df_year['사용승인일(년도)'], errors='coerce').astype('Int64')
# %% 승인연도 필터링, 연령 계산 
cleaned_year = df_year.dropna(subset='사용승인일(년도)')
filltered_year = cleaned_year[cleaned_year['사용승인일(년도)'] >= 1800]
filltered_year['연령'] = 2025 - filltered_year['사용승인일(년도)']
# %% 건축물 연령 분포 시각화
bins = list(range(0, 101, 10)) + [float('inf')]
labels = [f"{i}~{i+10}년" for i in range(0, 100, 10)] + ["100년 이상"]

filltered_year['연령대'] = pd.cut(filltered_year['연령'], bins=bins, labels=labels, right=False)
# 연령대별 건물 수 집계
age_group_counts = filltered_year['연령대'].value_counts().sort_index()
# Plotly로 막대 그래프 시각화

fig5 = px.bar(
    x=age_group_counts.index,
    y=age_group_counts.values,
    labels={'x': '연령대', 'y': '건물 수'},
    title='노후화 구간별 건물 수 분포 (10년 단위)',
    text=age_group_counts.values,
    color=age_group_counts.values,
    color_continuous_scale='Viridis'
)

fig5.update_layout(
    xaxis_title="노후화 구간",
    yaxis_title="건물 수",
    uniformtext_minsize=8,
    uniformtext_mode='hide',
    bargap=0.3
)

fig5.show()
# %% 40년 이상은 한 범주로 처리한 것
bins = [0, 10, 20, 30, 40, float('inf')]
labels = ['0~10년', '10~20년', '20~30년', '30~40년', '40년 이상']

# 2. 구간화
filltered_year['연령대'] = pd.cut(filltered_year['연령'], bins=bins, labels=labels, right=False)

# 3. 집계
age_group_counts = filltered_year['연령대'].value_counts(sort=False)

# 4. 시각화
fig6 = px.bar(
    x=age_group_counts.index,
    y=age_group_counts.values,
    labels={'x': '연령대', 'y': '건물 수'},
    title='노후화 구간별 건물 수 분포 (40년 이상 묶음)',
    text=age_group_counts.values,
    color=age_group_counts.values,
    color_continuous_scale='Viridis'
)

fig6.update_layout(
    xaxis_title="노후화 구간",
    yaxis_title="건물 수",
    uniformtext_minsize=8,
    uniformtext_mode='hide',
    bargap=0.3
)

fig6.show()
# %% 용도, 노후화 교차
bins = [0, 10, 20, 30, 40, float('inf')]
labels = ['0~10년', '10~20년', '20~30년', '30~40년', '40년 이상']
filltered_year['연령대'] = pd.cut(filltered_year['연령'], bins=bins, labels=labels, right=False)

# 2. 교차표 생성: 주용도그룹 × 연령대
cross_tab = pd.crosstab(filltered_year['주용도그룹'], filltered_year['연령대'])

# 3. Plotly로 교차 막대그래프 (그룹별 스택)
fig7 = px.bar(
    cross_tab,
    x=cross_tab.index,
    y=cross_tab.columns,
    labels={'value': '건물 수', '주용도그룹': '주용도 그룹', '연령대': '연령대'},
    title='주용도 그룹별 연령대별 건물 수',
    barmode='stack'  # 누적 막대
)

fig7.update_layout(
    xaxis_title='주용도 그룹',
    yaxis_title='건물 수',
    legend_title='연령대',
    bargap=0.2
)

fig7.show()
# %%

종합점수 분포

코드
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

df = pd.read_csv('./Data/건축물대장_동추가.csv')

plt.rcParams['font.family'] = 'Malgun Gothic'
plt.rcParams['axes.unicode_minus'] = False

# 종합점수 시각화
plt.figure(figsize=(10, 6))
sns.histplot(df["종합점수"], kde=False, bins=50, color="skyblue")
plt.title("종합점수 분포", fontsize=16)
plt.xlabel("종합점수", fontsize=12)
plt.ylabel("건물 수", fontsize=12)
plt.show()